package com.boat.hbase.util;

import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.Resource;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @Description: Hbase API封装
 * @author boat
 * @date 2017年12月27日 上午9:23:20
 * @version V1.0
 */

@Component("hbaseTemplate")
public class HbaseTemplate {

	private static Logger logger = LoggerFactory.getLogger(HbaseTemplate.class);

	@Resource
	private HbaseConectionManager hbaseConectionManager;

	public static final String charSet = "UTF-8";

	/**
	 * 执行hbase操作
	 * 
	 * @param tableName
	 * @param action
	 * @return
	 * @throws Exception
	 */
	public <T> T execute(String tableName, TableCallback<T> action) {
		Connection conn = null;
		Table table = null;
		try {
			conn = hbaseConectionManager.getConnection();
			table = conn.getTable(TableName.valueOf(tableName));
			T result = action.doInTable(table);
			return result;
		} catch (Throwable th) {
			throw new RuntimeException(th);
		} finally {
			try {
				table.close();
			} catch (IOException e) {
				logger.error("hbase关闭table失败！", e);
			}
		}
	}

	/**
	 * 保存一行记录到Hbase
	 * 
	 * @param tableName
	 *            表名
	 * @param put
	 *            要添加的数据
	 * @return
	 * @throws Exception
	 */
	public void save(String tableName, final Put put) {
		this.execute(tableName, new TableCallback<Object>() {
			@Override
			public Object doInTable(Table table) {
				try {
					table.put(put);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				return null;
			}
		});
	}

	/**
	 * 保存一行或多行记录
	 * 
	 * @param tableName
	 *            表名
	 * @param put
	 *            要添加的数据
	 * @return
	 * @throws Exception
	 */
	public void saveBatch(String tableName, final List<Put> puts) {
		this.execute(tableName, new TableCallback<Object>() {
			@Override
			public Object doInTable(Table table) {
				try {
					table.put(puts);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				return null;
			}
		});
	}

	/**
	 * 删除一行记录
	 * 
	 * @param tableName
	 *            表名
	 * @param rowkey
	 *            rowkey
	 */
	public void delete(String tableName, final String rowkey) {
		this.execute(tableName, new TableCallback<Object>() {
			@Override
			public Object doInTable(Table htable) throws IOException {
				Delete delete = new Delete(rowkey.getBytes(charSet));
				try {
					htable.delete(delete);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				return null;
			}
		});
	}

	/**
	 * 删除一行或多行记录
	 * 
	 * @param tableName
	 *            表名
	 * @param rowkey
	 *            rowkey
	 */
	public void deleteBatch(String tableName, final List<String> rowkeys) {
		this.execute(tableName, new TableCallback<Object>() {
			@Override
			public Object doInTable(Table htable) throws IOException {
				List<Delete> deletes = new ArrayList<Delete>();
				for (String rowkey : rowkeys) {
					Delete delete = new Delete(rowkey.getBytes(charSet));
					deletes.add(delete);
				}
				try {
					htable.delete(deletes);
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				return null;
			}
		});
	}

	/**
	 * 通过scan查找数据
	 * 
	 * @param tableName
	 *            表名
	 * @param scan
	 *            扫描器
	 * @return
	 */
	public List<Map<String, Object>> find(String tableName, final Scan scan) {
		return this.execute(tableName, new TableCallback<List<Map<String, Object>>>() {
			@Override
			public List<Map<String, Object>> doInTable(Table table) throws Throwable {
				ResultScanner resultScanner = table.getScanner(scan);
				try {
					List<Map<String, Object>> resultMapList = new ArrayList<Map<String, Object>>();
					for (Result result : resultScanner) {
						// 得到单元格集合,一行记录
						List<Cell> cs = result.listCells();
						Map<String, Object> resultMap = new HashMap<String, Object>();
						for (Cell cell : cs) {
							if (resultMap.containsKey("rowKey")) {
								// 取行健
								String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
								resultMap.put("rowkey", rowKey);
							}
							// 取到修饰名
							String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
							// 取到值
							String value = Bytes.toString(CellUtil.cloneValue(cell));
							resultMap.put(qualifier, value);
						}
						resultMapList.add(resultMap);
					}
					return resultMapList;
				} finally {
					resultScanner.close();
				}
			}
		});
	}

	public List<Object> findObjects(String tableName, Scan scan, Class clazz) {
		return this.execute(tableName, new TableCallback<List<Object>>() {
			@Override
			public List<Object> doInTable(Table table) throws Throwable {
				ResultScanner resultScanner = table.getScanner(scan);
				try {
					List<Object> objectList = new ArrayList<Object>();
					for (Result result : resultScanner) {
						// 得到单元格集合,一行记录
						List<Cell> cs = result.listCells();
						Map<String, byte[]> resultMap = new HashMap<String, byte[]>();
						resultMap.put("rowKey", result.getRow());
						for (Cell cell : cs) {
							// 取到修饰名
							String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
							// 取到值
							byte[] value = CellUtil.cloneValue(cell);
							resultMap.put(qualifier, value);
						}

						BeanInfo beanInfo = Introspector.getBeanInfo(clazz); // 获取类属性
						Object obj = clazz.newInstance(); // 创建 JavaBean 对象
						// 给 JavaBean 对象的属性赋值
						PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
						for (int i = 0; i < propertyDescriptors.length; i++) {
							PropertyDescriptor descriptor = propertyDescriptors[i];
							String propertyName = descriptor.getName();
							if (resultMap.containsKey(propertyName)) {
								// 下面一句可以 try 起来，这样当一个属性赋值失败的时候就不会影响其他属性赋值。
								byte[] byteArrValue = resultMap.get(propertyName);
								Method readMethod = descriptor.getReadMethod();
								String className = readMethod.getReturnType().getName();
								Object[] args = new Object[1];
								if (byteArrValue != null) {
									args[0] = HBytesUtil.toObject(byteArrValue, className);
									descriptor.getWriteMethod().invoke(obj, args);
								}
							}
						}
						objectList.add(obj);
					}
					return objectList;
				} finally {
					resultScanner.close();
				}
			}
		});
	}
}
